package next.fire.spinus.logx.utils.async;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Created by daibing on 2020/12/10.
 */
public class AsyncMergeProcessor<T> {
    private final String name;
    private final int threadSize;
    private final BlockingQueue<T> queue;
    private LooperPool pool;

    public AsyncMergeProcessor(String name, int threadSize, int bufferSize) {
        this.name = name;
        this.threadSize = threadSize;
        this.queue = new ArrayBlockingQueue<T>(bufferSize);
    }

    public void submit(T item) {
        while (!queue.offer(item)) {
            T old = queue.poll();
            this.handleLostData(old);
        }
    }

    public void put(T item) {
        try {
            queue.put(item);
        } catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }

    protected int pageSize() {
        // impl by sub class
        return 100;
    }

    protected int queuePollTimeoutMillis() {
        // impl by sub class
        return 1000;
    }

    protected void handleLostData(T item) {
        // impl by sub class
    }

    protected int handlePageData(List<T> itemList) {
        // impl by sub class, return valid data count handle by sub class
        return 0;
    }

    protected void handlePageErrorData(List<T> itemList, Throwable t) {
        // impl by sub class
    }

    protected void print(int queueSize, int itemsSize, int handleCount) {
        // impl by sub class
    }

    public void startup() {
        System.out.println(String.format("start async merge %s processor ... ", name));
        final int pageSize = pageSize();
        final int queuePollTimeoutMillis = queuePollTimeoutMillis();
        pool = new LooperPool(threadSize, new LooperPool.LooperBuilder() {
            @Override
            public Looper build() {
                return new Looper(name, 50, 1500) {
                    private List<T> itemList;

                    @Override
                    protected void loop() throws Throwable {
                        if (itemList == null || itemList.isEmpty()) {
                            itemList = listByPageSize(pageSize, queuePollTimeoutMillis);
                        }
                        int count = handlePageData(itemList);
                        print(queue.size(), itemList.size(), count);
                        itemList.clear();
                    }

                    @Override
                    protected void loopThrowable(Throwable t) {
                        handlePageErrorData(itemList, t);
                        itemList.clear();
                    }
                };
            }
        });
        pool.startup();
    }

    public void shutdown() {
        if (pool != null) {
            pool.shutdown();
        }
    }

    private List<T> listByPageSize(int pageSize, int queuePollTimeoutMillis) throws InterruptedException {
        List<T> items = new ArrayList<>(pageSize);
        for (int i = 0; i < pageSize; i++) {
            T item = queue.poll(queuePollTimeoutMillis, TimeUnit.MILLISECONDS);
            if (item == null) {
                break;
            }
            items.add(item);
        }
        return items;
    }
}
